Kevin Mader
Spark East, NYC, 19 March 2015
The only technique which can do all
[1] Mokso et al., J. Phys. D, 46(49),2013
Courtesy of M. Pistone at U. Bristol
If you looked at one 1000 x 1000 sized image every second
it would take you
139
hours to browse through a terabyte of data.
| Year | Time to 1 TB | Man power to keep up | Salary Costs / Month |
|---|---|---|---|
| 2000 | 4096 min | 2 people | 25 kCHF |
| 2008 | 1092 min | 8 people | 95 kCHF |
| 2014 | 32 min | 260 people | 3255 kCHF |
| 2016 | 2 min | 3906 people | 48828 kCHF |
\[ \textrm{Transistors} \propto 2^{T/(\textrm{18 months})} \]
Based on data from https://gist.github.com/humberto-ortiz/de4b3a621602b78bf90d
There are now many more transistors inside a single computer but the processing speed hasn't increased. How can this be?
The figure shows the range of cloud costs (determined by peak usage) compared to a local workstation with utilization shown as the average number of hours the computer is used each week.
The figure shows the cost of a cloud based solution as a percentage of the cost of buying a single machine. The values below 1 show the percentage as a number. The panels distinguish the average time to replacement for the machines in months
Here the equal cost point is shown where the cloud and local workstations have the same cost. The x-axis is the percentage of resources used at peak-time and the y shows the expected usable lifetime of the computer. The color indicates the utilization percentage and the text on the squares shows this as the numbers of hours used in a week.
What took an entire PhD 3-4 years ago, can now be measured in a weekend, or even several seconds. Analysis tools have not kept up, are difficult to customize, and usually highly specific.
Data-structures that were fast and efficient for computers with 640kb of memory do not make sense anymore
CPU's are not getting that much faster but there are a lot more of them. Iterating through a huge array takes almost as long on 2014 hardware as 2006 hardware
Google, Facebook, Yahoo, and Amazon had these, or very similar problems years ago. They hired a lot of very competent Computer Science PhDs to solve it.
Cloud computing infrastructures are 10-100X cheaper than labor costs, usually more reliable internal systems, and easily accessible from almost anywhere.
The most important job for any piece of analysis is to be correct.
Almost all image processing tasks require a number of people to evaluate and implement them and are almost always moving targets
The last of the major priorities is speed which covers both scalability, raw performance, and development time.
The two frameworks provide a free out of the box solution for
These frameworks are really cool and Spark has a big vocabulary, but flatMap, filter, aggregate, join, groupBy, and fold still do not sound like anything I want to do to an image.
I want to
We have developed a number of commands for SIL handling standard image processing tasks
New components can be added using or imported from ImageJ directly. The resulting analyses are then parallelized by the Spark Engine and run on multiple cores/CPUs locally, on a cluster, supercomputer, or even a virtual in the cluster in the cloud which can be started in seconds.
Combining many different components together inside of the Spark Shell, IPython or Zeppelin, make it easier to assemble workflows
We want to understand the relationship between genetic background and bone structure
Genetic studies require hundreds to thousands of samples, in this case the difference between 717 and 1200 samples is the difference between finding the links and finding nothing.
val bones = sc.loadImages("work/f2_bones/*/bone.tif")bones.threshold(OTSU).invert.componentLabel - browse all samples in browserval cells = sqlContext.csvFile("work/f2_bones/*/lacuna.csv")val avgVol = sqlContext.sql("select SAMPLE,AVG(VOLUME) FROM cells GROUP BY SAMPLE")avgVol.filter(_._2>1000).map(sampleToPath).joinByKey(bones)| Task | Single Core Time | Spark Time (40 cores) |
|---|---|---|
| Load and Preprocess | 360 minutes | 10 minutes |
| Single Column Average | 4.6s | 400ms |
| 1 K-means Iteration | 2 minutes | 1s |
\[ \textrm{Images}: \textrm{RDD}[((x,y,z),Img[Double])] =\\ \left[(\vec{x},\textrm{Img}),\cdots\right] \]
dispField = Images.
cartesian(Images).map{
case ((xA,ImA), (xB,ImB)) =>
xcorr(ImA,ImB,in=xB-xA)
}
From the updated information provided by the cross correlations and by applying appropriate smoothing criteria (if necessary).
The stitching itself, rather than rewriting the original data can be done in a lazy fashion as certain regions of the image are read.
def getView(tPos,tSize) =
stImgs.
filter(x=>abs(x-tPos)<img.size).
map { case (x,img) =>
val oImg = new Image(tSize)
oImg.copy(img,x,tPos)
}.addImages(AVG)
This also ensures the original data is left unaltered and all analysis is reversible.
getView(Pos(26.5,13),Size(2,2))
In the biological imaging community, the open source tools of ImageJ2 and Fiji are widely accepted and have a large number of readily available plugins and tools.
We can integrate the functionality directly into Spark and perform operations on much larger datasets than a single machine could have in memory. Additionally these analyses can be performed on streaming data.
val wr = new WebcamReceiver()
val ssc = sc.toStreaming(strTime)
val imgList = ssc.receiverStream(wr)
val filtImgs = allImgs.mapValues(_.run("Median...","radius=3"))
val totImgs = inImages.count()
val bgImage = inImages.reduce(_ add _).multiply(1.0/totImgs)
val eventImages = filtImgs.
transform{
inImages =>
val corImage = inImages.map {
case (inTime,inImage) =>
val corImage = inImage.subtract(bgImage)
(corImage.getImageStatistics().mean,
(inTime,corImage))
}
corImage
}
eventImages.filter(iv => Math.abs(iv._1)>20).
foreachRDD(showResultsStr("outlier",_))
Apache Spark is brilliant platform and utilizing GraphX, MLLib, and other packages there unlimited possibilities
def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
val wind=(-windSize to windSize)
val pos=pvec._1
val scalevalue=pvec._2/(wind.length*wind.length)
for(x<-wind; y<-wind)
yield ((pos._1+x,pos._2+y),scalevalue)
}
val filtImg=roiImg.
flatMap(cvec => spread_voxels(cvec)).
filter(roiFun).reduceByKey(_ + _)
Here we use a KNIME-based workflow and our Spark Imaging Layer extensions to create a workflow without any Scala or programming knowledge and with an easily visible flow from one block to the next without any performance overhead of using other tools.
A spinoff - 4Quant: From images to insight
For many datasets processing, segmentation, and morphological analysis is all the information needed to be extracted. For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.
\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]
Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.
Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based infiniband system to a crawl
One of the central tenants of MapReduce™ is data-centric computation \( \rightarrow \) instead of data to computation, move the computation to the data.